1b5576
@@ -19,20 +19,31 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -50,10 +61,15 @@
import org.apache.zookeeper.KeeperException;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class Import {
+  private static final Log LOG = LogFactory.getLog(Import.class);
   final static String NAME = "import";
   final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
   final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
-  private static final Log LOG = LogFactory.getLog(Import.class);
+  final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
+  final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
+
+  // Optional filter to use for mappers
+  private static Filter filter;
 
   /**
    * A mapper that just writes out KeyValues.
@@ -76,6 +92,10 @@
public class Import {
     throws IOException {
       try {
         for (KeyValue kv : value.raw()) {
+          kv = filterKv(kv);
+          // skip if we filtered it out
+          if (kv == null) continue;
+
           context.write(row, convertKv(kv, cfRenameMap));
         }
       } catch (InterruptedException e) {
@@ -86,6 +106,7 @@
public class Import {
     @Override
     public void setup(Context context) {
       cfRenameMap = createCfRenameMap(context.getConfiguration());
+      filter = instantiateFilter(context.getConfiguration());
     }
   }
 
@@ -121,6 +142,10 @@
public class Import {
       Put put = null;
       Delete delete = null;
       for (KeyValue kv : result.raw()) {
+        kv = filterKv(kv);
+        // skip if we filter it out
+        if (kv == null) continue;
+
         kv = convertKv(kv, cfRenameMap);
         // Deletes and Puts are gathered and written when finished
         if (kv.isDelete()) {
@@ -149,6 +174,8 @@
public class Import {
     public void setup(Context context) {
       Configuration conf = context.getConfiguration();
       cfRenameMap = createCfRenameMap(conf);
+      filter = instantiateFilter(conf);
+
       try {
         HConnection connection = HConnectionManager.getConnection(conf);
         ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
@@ -165,6 +192,77 @@
public class Import {
     }
   }
 
+  /**
+   * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
+   * optionally not include in the job output
+   * @param conf {@link Configuration} from which to load the filter
+   * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
+   * @throws IllegalArgumentException if the filter is misconfigured
+   */
+  private static Filter instantiateFilter(Configuration conf) {
+    // get the filter, if it was configured
+    Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+    if (filterClass == null) {
+      LOG.debug("No configured filter class, accepting all keyvalues.");
+      return null;
+    }
+    LOG.debug("Attempting to create filter:" + filterClass);
+
+    try {
+      Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
+      return (Filter) m.invoke(null, getFilterArgs(conf));
+    } catch (IllegalAccessException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (SecurityException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (NoSuchMethodException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (IllegalArgumentException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static ArrayList<byte[]> getFilterArgs(Configuration conf) {
+    ArrayList<byte[]> args = new ArrayList<byte[]>();
+    String[] sargs = conf.getStrings(FILTER_ARGS_CONF_KEY);
+    for (String arg : sargs) {
+      // all the filters' instantiation methods expected quoted args since they are coming from
+      // the shell, so add them here, though its shouldn't really be needed :-/
+      args.add(Bytes.toBytes("'" + arg + "'"));
+    }
+    return args;
+  }
+
+  /**
+   * Attempt to filter out the keyvalue
+   * @param kv {@link KeyValue} on which to apply the filter
+   * @return <tt>null</tt> if the key should not be written, otherwise returns the original
+   *         {@link KeyValue}
+   */
+  private static KeyValue filterKv(KeyValue kv) {
+    // apply the filter and skip this kv if the filter doesn't apply
+    if (filter != null) {
+      Filter.ReturnCode code = filter.filterKeyValue(kv);
+      System.out.println("Filter returned:" + code);
+      // if its not an accept type, then skip this kv
+      if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
+          .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
+        if (LOG.isDebugEnabled()) {
+          System.out.println("Skipping key: " + kv + " from filter decision: " + code);
+        }
+        return null;
+      }
+    }
+    return kv;
+  }
+
   // helper: create a new KeyValue based on CF rename map
   private static KeyValue convertKv(KeyValue kv, Map<byte[], byte[]> cfRenameMap) {
     if(cfRenameMap != null) {
@@ -244,13 +342,33 @@
public class Import {
     }
     conf.set(CF_RENAME_PROP, sb.toString());
   }
-  
+
+  /**
+   * Add a Filter to be instantiated on import
+   * @param conf Configuration to update (will be passed to the job)
+   * @param clazz {@link Filter} subclass to instantiate on the server.
+   * @param args List of arguments to pass to the filter on instantiation
+   */
+  public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
+      List<String> args) {
+    conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
+
+    // build the param string for the key
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < args.size(); i++) {
+      String arg = args.get(i);
+      builder.append(arg);
+      if (i != args.size() - 1) {
+        builder.append(",");
+      }
+    }
+    conf.set(Import.FILTER_ARGS_CONF_KEY, builder.toString());
+  }
 
   /**
    * Sets up the actual job.
-   *
-   * @param conf  The current configuration.
-   * @param args  The command line parameters.
+   * @param conf The current configuration.
+   * @param args The command line parameters.
    * @return The newly created job.
    * @throws IOException When setting up the job fails.
    */
@@ -263,6 +381,17 @@
public class Import {
     FileInputFormat.setInputPaths(job, inputDir);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+
+    // make sure we get the filter in the jars
+    try {
+      Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+      if (filter != null) {
+        TableMapReduceUtil.addDependencyJars(conf, filter);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
     if (hfileOutPath != null) {
       job.setMapperClass(KeyValueImporter.class);
       HTable table = new HTable(conf, tableName);
@@ -295,6 +424,15 @@
public class Import {
     System.err.println("By default Import will load data directly into HBase. To instead generate");
     System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
     System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+    System.err
+        .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
+    System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
+    System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
+    System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
+        + CF_RENAME_PROP + " property. Futher, filters will only use the"
+        + "Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+        + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including "
+        + "the KeyValue.");
     System.err.println("For performance consider the following options:\n"
         + "  -Dmapred.map.tasks.speculative.execution=false\n"
         + "  -Dmapred.reduce.tasks.speculative.execution=false");
